{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"http://dask.readthedocs.io/en/latest/_images/dask_horizontal.svg\"\n",
    "     align=\"right\"\n",
    "     width=\"30%\"\n",
    "     alt=\"Dask logo\\\">\n",
    "\n",
    "# Dask Arrays - parallelized numpy\n",
    "Parallel, larger-than-memory, n-dimensional array using blocked algorithms. \n",
    "\n",
    "*  **Parallel**: Uses all of the cores on your computer\n",
    "*  **Larger-than-memory**:  Lets you work on datasets that are larger than your available memory by breaking up your array into many small pieces, operating on those pieces in an order that minimizes the memory footprint of your computation, and effectively streaming data from disk.\n",
    "*  **Blocked Algorithms**:  Perform large computations by performing many smaller computations.\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "<img src=\"https://docs.dask.org/en/stable/_images/dask-array.svg\" width=\"40%\" align=\"right\">\n",
    "\n",
    "\n",
    "In other words, Dask Array implements a subset of the NumPy ndarray interface using blocked algorithms, cutting up the large array into many small arrays. This lets us compute on arrays larger than memory using all of our cores. We coordinate these blocked algorithms using Dask graphs.\n",
    "\n",
    "In this notebook, we'll build some understanding by implementing some blocked algorithms from scratch.\n",
    "We'll then use Dask Array to analyze large datasets, in parallel, using a familiar NumPy-like API.\n",
    "\n",
    "**Related Documentation**\n",
    "\n",
    "* [Array documentation](https://docs.dask.org/en/latest/array.html)\n",
    "* [Array screencast](https://youtu.be/9h_61hXCDuI)\n",
    "* [Array API](https://docs.dask.org/en/latest/array-api.html)\n",
    "* [Array examples](https://examples.dask.org/array.html)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create datasets"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Create the datasets you will be using in this notebook:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%run prep.py -d random"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start the Client"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client\n",
    "\n",
    "client = Client(n_workers=4)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Blocked Algorithms in a nutshell\n",
    "\n",
    "Let's do side by side the sum of the elements of an array using a NumPy array and a Dask array. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import numpy as np\n",
    "import dask.array as da"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# NumPy array\n",
    "a_np = np.ones(10)\n",
    "a_np"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "We know that we can use `sum()` to compute the sum of the elements of our array, but to show what a blocksized operation would look like, let's do:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a_np_sum = a_np[:5].sum() + a_np[5:].sum()\n",
    "a_np_sum"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Now notice that each sum in the computation above is completely independent so they could be done in parallel. \n",
    "To do this with Dask array, we need to define our \"slices\", we do this by defining the amount of elements we want per block using the variable `chunks`. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a_da = da.ones(10, chunks=5)\n",
    "a_da"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Important!**\n",
    "\n",
    "Note here that to get two blocks, we specify `chunks=5`, in other words, we have 5 elements per block. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a_da_sum = a_da.sum()\n",
    "a_da_sum"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Task Graphs\n",
    "\n",
    "In general, the code that humans write rely on compilers or interpreters so the computers can understand what we wrote. When we move to parallel execution there is a desire to shift responsibility from the compilers to the human, as they often bring the analysis, optimization, and execution of code into the code itself. In these cases, we often represent the structure of our program explicitly as data within the program itself.\n",
    "\n",
    "In Dask we use task scheduling, where we break our program into into many medium-sized tasks or units of computation.We represent these tasks as nodes in a graph with edges between nodes if one task depends on data produced by another. We call upon a task scheduler to execute this graph in a way that respects these data dependencies and leverages parallelism where possible, so multiple independent tasks can be run simultaneously."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# visualize the low level Dask graph using cytoscape\n",
    "a_da_sum.visualize(engine=\"cytoscape\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a_da_sum.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Performance comparison\n",
    "------------------------------\n",
    "\n",
    "Let's try a more interesting example. We will create a 20_000 x 20_000 array with normally distributed values, and take the mean along one of its axis.\n",
    "\n",
    "**Note:**\n",
    "\n",
    "If you are running on Binder, the Numpy example might need to be a smaller one due to memory issues. "
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Numpy version "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "xn = np.random.normal(10, 0.1, size=(30_000, 30_000))\n",
    "yn = xn.mean(axis=0)\n",
    "yn"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Dask array version"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))\n",
    "xd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "xd.nbytes / 1e9  # Gigabytes of the input processed lazily"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "yd = xd.mean(axis=0)\n",
    "yd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "xd = da.random.normal(10, 0.1, size=(30_000, 30_000), chunks=(3000, 3000))\n",
    "yd = xd.mean(axis=0)\n",
    "yd.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Questions to think about:**\n",
    "\n",
    "* What happens if the Dask chunks=(10000,10000)?\n",
    "* What happens if the Dask chunks=(30,30)?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Exercise:** \n",
    "\n",
    "For Dask arrays, compute the mean along `axis=1` of the sum of the x array and its transpose. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Your code here"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Solution**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "source_hidden": true
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "x_sum = xd + xd.T\n",
    "res = x_sum.mean(axis=1)\n",
    "res.compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Choosing good chunk sizes\n",
    "This section was inspired on a Dask blog by Genevieve Buckley you can read it [here](https://blog.dask.org/2021/11/02/choosing-dask-chunk-sizes)\n",
    "\n",
    "A common problem when getting started with Dask array is determine what is a good chunk size. But what is a good size, and how do we determine this? \n",
    "\n",
    "\n",
    "### Get to know the chunks \n",
    "\n",
    "We can think of Dask arrays as a big structure composed by chunks of a smaller size, where these chunks are typically an a single `numpy` array, and they are all arranged to form a larger Dask array. \n",
    "\n",
    "If you have a Dask array and want to know more information about chunks and their size, you can use the `chunksize` and `chunks` attributes to access this information. If you are in a jupyter notebook\n",
    "you can also visualize the Dask array via its HTML representation. "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr = da.random.random((1000, 1000, 1000))\n",
    "darr"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Notice that when we created the Dask array, we did not specify the `chunks`. Dask has set by default `chunks='auto'` which accommodates ideal chunk  sizes. To learn more on how auto-chunking works you can go to this documentation https://docs.dask.org/en/stable/array-chunks.html#automatic-chunking\n",
    "\n",
    "`darr.chunksize` shows the largest chunk size. If you expect your array to have uniform chunk sizes this is a a good summary of the chunk size information. But if your array have irregular chunks, `darr.chunks` will show you the explicit sizes of all the chunks along all the dimensions of your dask array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr.chunksize"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr.chunks"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's modify our example to see explore chunking a bit more. We can rechunk our array:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr = darr.rechunk({0: -1, 1: 100, 2: \"auto\"})"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr.chunksize"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "darr.chunks"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "**Exercise:**\n",
    "\n",
    "- What does -1 do when specify as the chunk on a certain axis?"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Too small is a problem\n",
    "\n",
    "If your chunks are too small, the amount of actual work done by every task is very tiny, and the overhead of coordinating all these tasks results in a very inefficient process. \n",
    "\n",
    "In general, the dask scheduler takes approximately one millisecond to coordinate a single task. That means we want the computation time to be comparatively large, i.e in the order of seconds. \n",
    "\n",
    "Intuitive analogy by Genevieve Buckley:\n",
    "\n",
    "> Lets imagine we are building a house. It is a pretty big job, and if there were only one worker it would take much too long to build. So we have a team of workers and a site foreman. The site foreman is equivalent to the Dask scheduler: their job is to tell the workers what tasks they need to do.  \n",
    "Say we have a big pile of bricks to build a wall, sitting in the corner of the building site. If the foreman (the Dask scheduler) tells workers to go and fetch a single brick at a time, then bring each one to where the wall is being built, you can see how this is going to be very slow and inefficient! The workers are spending most of their time moving between the wall and the pile of bricks. Much less time is going towards doing the actual work of mortaring bricks onto the wall.  \n",
    "Instead, we can do this in a smarter way. The foreman (Dask scheduler) can tell the workers to go and bring one full wheelbarrow load of bricks back each time. Now workers are spending much less time moving between the wall and the pile of bricks, and the wall will be finished much quicker. \n",
    "\n",
    "### Too big is a problem\n",
    "\n",
    "If your chunks are too big, this is also a problem because you will likely run out of memory. You will start seeing in the dashboard that data is being spill to disk and this will lead to performance decrements. \n",
    "\n",
    "If we load to much data into memory, Dask workers will start to spill data to disk to avoid crashing. Spilling data to disk will slow things down significantly, because of all the extra read and write operations to disk. This is definitely a situation that we want to avoid, to watch out for this you can look at the worker memory plot on the dashboard. Orange bars are a warning you are close to the limit, and gray means data is being spilled to disk. \n",
    "\n",
    "To watch out for this, look at the worker memory plot on the Dask dashboard. Orange bars are a warning you are close to the limit, and gray means data is being spilled to disk - not good! For more tips, see the section on using the Dask dashboard below. To learn more about the memory plot, check the [dashboard documentation](https://docs.dask.org/en/stable/dashboard.html#bytes-stored-and-bytes-per-worker).\n",
    "\n",
    "\n",
    "### Rules of thumb\n",
    "\n",
    "- Users have reported that chunk sizes smaller than 1MB tend to be bad. In general, a chunk size between **100MB and 1GB is good**, while going over 1 or 2GB means you have a really big dataset and/or a lot of memory available per worker.\n",
    "- Upper bound: Avoid very large task graphs. More than 10,000 or 100,000 chunks may start to perform poorly.\n",
    "- Lower bound: To get the advantage of parallelization, you need the number of chunks to at least equal the number of worker cores available (or better, the number of worker cores times 2). Otherwise, some workers will stay idle.\n",
    "- The time taken to compute each task should be much larger than the time needed to schedule the task. The Dask scheduler takes roughly 1 millisecond to coordinate a single task, so a good task computation time would be in the order of seconds (not milliseconds).\n",
    "- Chunks should be aligned with array storage on disk. Modern NDArray storage formats (HDF5, NetCDF, TIFF, Zarr) allow arrays to be stored in chunks so that the blocks of data can be pulled efficiently. However, data stores often chunk more finely than is ideal for Dask array, so it is common to choose a chunking that is a multiple of your storage chunk size, otherwise you might incur high overhead. For example,  if you are loading data that is chunked in blocks of (100, 100), the  you might might choose a chunking strategy more like (1000, 2000) that is larger but still divisible by (100, 100). \n",
    "\n",
    "For more more advice on chunking see https://docs.dask.org/en/stable/array-chunks.html"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example of chunked data with Zarr\n",
    "\n",
    "Zarr is a format for the storage of chunked, compressed, N-dimensional arrays. Zarr provides classes and functions for working with N-dimensional arrays that behave like NumPy arrays (Dask array behave like Numpy arrays) but whose data is divided into chunks and each chunk is compressed. If you are already familiar with HDF5 then Zarr arrays provide similar functionality, but with some additional flexibility.\n",
    "\n",
    "For extra material check the [Zarr tutorial](https://zarr.readthedocs.io/en/stable/tutorial.html)\n",
    "\n",
    "**Let's read an array from zarr:**"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import zarr"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a = da.from_zarr(\"data/random.zarr\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "a"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Notice that the array is already chunked, and we didn't specify anything when loading it. Now notice that the chunks have a nice chunk size, let's compute the mean and see how long it takes to run"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "a.mean().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's load a separate example where the `chunksize` is much smaller, and see what happen"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "b = da.from_zarr(\"data/random_sc.zarr\")\n",
    "b"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "b.mean().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Exercise:\n",
    "\n",
    "Provide a `chunksize` when reading `b` that will improve the time of computation of the mean. Try multiple `chunks` values and see what happens."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Your code here"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "jupyter": {
     "source_hidden": true
    },
    "tags": []
   },
   "outputs": [],
   "source": [
    "# 1 possible Solution (imitate original). chunks will vary if you are in binder\n",
    "c = da.from_zarr(\"data/random_sc.zarr\", chunks=(6250000,))\n",
    "c"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%time\n",
    "c.mean().compute()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Xarray  \n",
    "\n",
    "In some applications we have multidimensional data, and sometimes working with all this dimensions can be confusing. Xarray is an open source project and Python package that makes working with labeled multi-dimensional arrays easier. \n",
    "\n",
    "Xarray is inspired by and borrows heavily from pandas, the popular data analysis package focused on labeled tabular data. It is particularly tailored to working with netCDF files, which were the source of xarray’s data model, and integrates tightly with Dask for parallel computing.\n",
    "\n",
    "Xarray introduces labels in the form of dimensions, coordinates and attributes on top of raw NumPy-like arrays, which allows for a more intuitive, more concise, and less error-prone developer experience. \n",
    "\n",
    "Let's learn how to use xarray and Dask together:\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import xarray as xr"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds = xr.tutorial.open_dataset(\n",
    "    \"air_temperature\",\n",
    "    chunks={  # this tells xarray to open the dataset as a dask array\n",
    "        \"lat\": 25,\n",
    "        \"lon\": 25,\n",
    "        \"time\": -1,\n",
    "    },\n",
    ")\n",
    "ds"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds.air"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "ds.air.chunks"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "mean = ds.air.mean(\"time\")  # no activity on dashboard\n",
    "mean  # contains a dask array"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# we will see dashboard activity\n",
    "mean.load()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Standard Xarray Operations\n",
    "\n",
    "Let's grab the air variable and do some operations. Operations using xarray objects are identical, regardless if the underlying data is stored as a Dask array or a NumPy array."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dair = ds.air"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dair2 = dair.groupby(\"time.month\").mean(\"time\")\n",
    "dair_new = dair - dair2\n",
    "dair_new"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Call `.compute()` or `.load()` when you want your result as a `xarray.DataArray` with data stored as NumPy arrays."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# things happen in the dashboard\n",
    "dair_new.load()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Time Series Operations with xarray\n",
    "\n",
    "Because we have a datetime index time-series operations work efficiently, for example we can do a resample and then plot the result."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dair_resample = dair.resample(time=\"1w\").mean(\"time\").std(\"time\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "dair_resample.load().plot(figsize=(12, 8))"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Learn More \n",
    "\n",
    "Both xarray and zarr have their own tutorials that go into greater depth:\n",
    "\n",
    "* [Zarr tutorial](https://zarr.readthedocs.io/en/stable/tutorial.html)\n",
    "* [Xarray tutorial](https://tutorial.xarray.dev/intro.html)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Close your cluster\n",
    "\n",
    "It's good practice to close any Dask cluster you create:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "client.shutdown()"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.10.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
